import argparse
import gc
import logging
import math
import subprocess
import tempfile
from pathlib import Path
from typing import List

import torch
from datasets import load_dataset

from optimum.executorch import (
    ExecuTorchModelForCausalLM,
    ExecuTorchModelForImageClassification,
    ExecuTorchModelForMaskedLM,
    ExecuTorchModelForSeq2SeqLM,
    ExecuTorchModelForSpeechSeq2Seq,
)
from transformers import (
    AutoConfig,
    AutoModelForCausalLM,
    AutoModelForImageClassification,
    AutoProcessor,
    AutoTokenizer,
)


def cli_export(command, model_dir):
    p = Path(model_dir)
    if p.exists():
        if not p.is_dir():
            raise Exception(f"Path {model_dir} already exists and is not a directory.")
        if any(p.iterdir()):
            raise Exception(
                f"Existing directory {model_dir} is non-empty. Please remove it first."
            )
    try:
        subprocess.run(command, check=True)
        print("Export completed successfully.")
    except subprocess.CalledProcessError as e:
        print(f"Export failed with error: {e}")


def check_causal_lm_output_quality(
    model_id: str,
    generated_tokens: List[int],
    max_perplexity_threshold: float = 100.0,
):
    """
    Evaluates the quality of text generated by a causal language model by calculating its perplexity.

    Args:
        model_id: HuggingFace model identifier (e.g., "google/gemma2-2b")
        generated_tokens: The tokens generated by the exported model to evaluate
        max_perplexity_threshold: Maximum acceptable perplexity (lower is better)

    Returns:
        tuple: (is_quality_ok, reason) with boolean result and explanation
    """
    logging.info(f"Starting perplexity check with model '{model_id}' ...")
    # Load model
    cls_name = AutoModelForCausalLM
    if "llava" in model_id:
        from transformers import LlavaForConditionalGeneration

        cls_name = LlavaForConditionalGeneration
    try:
        model = cls_name.from_pretrained(
            model_id,
            low_cpu_mem_usage=True,
            use_cache=False,
            torch_dtype=torch.bfloat16,
        )
    except TypeError:
        model = cls_name.from_pretrained(
            model_id,
            low_cpu_mem_usage=True,
            torch_dtype=torch.bfloat16,
        )

    with torch.no_grad():
        outputs = model(input_ids=generated_tokens, labels=generated_tokens)

    # Get the loss (negative log-likelihood)
    loss = outputs.loss.item()

    # Calculate perplexity (exp of the average negative log-likelihood)
    perplexity = math.exp(loss)

    is_quality_ok = perplexity <= max_perplexity_threshold
    if is_quality_ok:
        logging.info(
            f"✓ Perplexity check passed: {perplexity:.2f} <= {max_perplexity_threshold}"
        )
    else:
        logging.warning(
            f"✗ Perplexity check failed: {perplexity:.2f} > {max_perplexity_threshold}"
        )

    # Clean up immediately
    del model
    del outputs
    gc.collect()

    return is_quality_ok


def test_text_generation(model_id, model_dir, recipe, *, quantize=True, run_only=False):
    command = [
        "optimum-cli",
        "export",
        "executorch",
        "--model",
        model_id,
        "--task",
        "text-generation",
        "--recipe",
        recipe,
        "--output_dir",
        model_dir,
    ]
    if "xnnpack" in recipe:
        command += [
            "--use_custom_sdpa",
            "--use_custom_kv_cache",
        ]
        if quantize:
            command += [
                "--qlinear",
                "8da4w",
                "--qembedding",
                "8w",
            ]
    elif "coreml" in recipe:
        command += [
            "--disable_dynamic_shapes",
        ]
        if quantize:
            command += [
                "--qlinear",
                "4w",
                "--qembedding",
                "8w",
            ]
    else:
        assert (
            not quantize
        ), "Quantization is only supported for XnnPack and CoreML recipes at the moment."

    if not run_only:
        cli_export(command, model_dir)

    tokenizer = AutoTokenizer.from_pretrained(model_id)
    tokenizer.save_pretrained(model_dir)
    model = ExecuTorchModelForCausalLM.from_pretrained(model_dir)
    generated_text = model.text_generation(
        tokenizer=tokenizer,
        prompt="Simply put, the theory of relativity states that",
        max_seq_len=64,
    )
    print(f"\nGenerated text:\n\t{generated_text}")
    generated_tokens = tokenizer(generated_text, return_tensors="pt").input_ids

    # Free memory before loading eager for quality check
    del model
    del tokenizer
    gc.collect()

    assert check_causal_lm_output_quality(model_id, generated_tokens) is True


def test_llm_with_image_modality(
    model_id, model_dir, recipe, *, quantize=True, run_only=False
):
    command = [
        "optimum-cli",
        "export",
        "executorch",
        "--model",
        model_id,
        "--task",
        "multimodal-text-to-text",
        "--recipe",
        recipe,
        "--output_dir",
        model_dir,
        "--use_custom_sdpa",
        "--use_custom_kv_cache",
        "--qlinear",
        "8da4w",
        "--qembedding",
        "8w",
    ]
    if not run_only:
        cli_export(command, model_dir)

    tokenizer = AutoTokenizer.from_pretrained(model_id)
    tokenizer.save_pretrained(model_dir)

    # input
    processor = AutoProcessor.from_pretrained(model_id)
    image_url = "https://llava-vl.github.io/static/images/view.jpg"
    conversation = [
        {
            "role": "system",
            "content": [
                {
                    "type": "text",
                    "text": "A chat between a curious human and an artificial intelligence assistant. The assistant gives helpful, detailed, and polite answers to the human's questions.",
                }
            ],
        },
        {
            "role": "user",
            "content": [
                {"type": "image", "url": image_url},
                {
                    "type": "text",
                    "text": "What are the things I should be cautious about when I visit here?",
                },
            ],
        },
    ]
    inputs = processor.apply_chat_template(
        conversation,
        add_generation_prompt=True,
        tokenize=True,
        return_dict=True,
        return_tensors="pt",
    )

    from executorch.extension.llm.runner import GenerationConfig, MultimodalRunner

    runner = MultimodalRunner(f"{model_dir}/model.pte", f"{model_dir}/tokenizer.model")
    generated_text = runner.generate_text_hf(
        inputs,
        GenerationConfig(max_new_tokens=128, temperature=0, echo=False),
        processor.image_token_id,
    )
    print(f"\nGenerated text:\n\t{generated_text}")
    # Free memory before loading eager for quality check
    del runner
    gc.collect()
    assert (
        check_causal_lm_output_quality(
            model_id, tokenizer.encode(generated_text, return_tensors="pt")
        )
        is True
    )


def test_fill_mask(model_id, model_dir, recipe, *, quantize=True, run_only=False):
    command = [
        "optimum-cli",
        "export",
        "executorch",
        "--model",
        model_id,
        "--task",
        "fill-mask",
        "--recipe",
        recipe,
        "--output_dir",
        model_dir,
    ]
    if "coreml" in recipe and quantize:
        command += [
            "--qlinear",
            "4w",
            "--qembedding",
            "8w",
        ]
    else:
        assert not quantize, "Quantization is not supported for non-CoreML recipes yet"

    if not run_only:
        cli_export(command, model_dir)

    tokenizer = AutoTokenizer.from_pretrained(model_id)
    model = ExecuTorchModelForMaskedLM.from_pretrained(model_dir)
    input_text = f"Paris is the {tokenizer.mask_token} of France."
    inputs = tokenizer(
        input_text,
        return_tensors="pt",
        padding="max_length",
        max_length=10,
    )

    # Test inference using ExecuTorch model
    exported_outputs = model.forward(inputs["input_ids"], inputs["attention_mask"])
    predicted_masks = tokenizer.decode(exported_outputs[0, 4].topk(5).indices)
    print(f"\nInput text:\n\t{input_text}\nPredicted masks:\n\t{predicted_masks}")


def test_t5(model_id, model_dir, recipe, *, quantize=False, run_only=False):
    assert not quantize, "Quantization is not supported for T5 model yet"

    assert model_id == "google-t5/t5-small"
    command = [
        "optimum-cli",
        "export",
        "executorch",
        "--model",
        model_id,
        "--task",
        "text2text-generation",
        "--recipe",
        recipe,
        "--output_dir",
        model_dir,
    ]
    if not run_only:
        cli_export(command, model_dir)

    tokenizer = AutoTokenizer.from_pretrained(model_id)
    model = ExecuTorchModelForSeq2SeqLM.from_pretrained(model_dir)
    article = (
        " New York (CNN)When Liana Barrientos was 23 years old, she got married in Westchester County, New York. A"
        " year later, she got married again in Westchester County, but to a different man and without divorcing"
        " her first husband.  Only 18 days after that marriage, she got hitched yet again. Then, Barrientos"
        ' declared "I do" five more times, sometimes only within two weeks of each other. In 2010, she married'
        " once more, this time in the Bronx. In an application for a marriage license, she stated it was her"
        ' "first and only" marriage. Barrientos, now 39, is facing two criminal counts of "offering a false'
        ' instrument for filing in the first degree," referring to her false statements on the 2010 marriage'
        " license application, according to court documents. Prosecutors said the marriages were part of an"
        " immigration scam. On Friday, she pleaded not guilty at State Supreme Court in the Bronx, according to"
        " her attorney, Christopher Wright, who declined to comment further. After leaving court, Barrientos was"
        " arrested and charged with theft of service and criminal trespass for allegedly sneaking into the New"
        " York subway through an emergency exit, said Detective Annette Markowski, a police spokeswoman. In total,"
        " Barrientos has been married 10 times, with nine of her marriages occurring between 1999 and 2002.  All"
        " occurred either in Westchester County, Long Island, New Jersey or the Bronx. She is believed to still be"
        " married to four men, and at one time, she was married to eight men at once, prosecutors say. Prosecutors"
        " said the immigration scam involved some of her husbands, who filed for permanent residence status"
        " shortly after the marriages.  Any divorces happened only after such filings were approved. It was"
        " unclear whether any of the men will be prosecuted. The case was referred to the Bronx District"
        " Attorney's Office by Immigration and Customs Enforcement and the Department of Homeland Security's"
        ' Investigation Division. Seven of the men are from so-called "red-flagged" countries, including Egypt,'
        " Turkey, Georgia, Pakistan and Mali. Her eighth husband, Rashid Rajput, was deported in 2006 to his"
        " native Pakistan after an investigation by the Joint Terrorism Task Force."
    )
    article = "summarize: " + article.strip()

    tokenizer = AutoTokenizer.from_pretrained(model_id)
    generated_text = model.text_generation(
        tokenizer=tokenizer,
        prompt=article,
    )
    expected_text = 'a year later, she got married again in westchester county, new york. she was married to a different man, but only 18 days after that marriage. she is facing two criminal counts of "offering a false instrument"'
    print(f"Generated text:\n\t{generated_text}")
    print(f"Expected text:\n\t{expected_text}")


def test_whisper(model_id, model_dir, recipe, *, quantize=False, run_only=False):
    assert not quantize, "Quantization is not supported for whisper model yet"

    assert model_id == "openai/whisper-tiny"
    command = [
        "optimum-cli",
        "export",
        "executorch",
        "--model",
        model_id,
        "--task",
        "automatic-speech-recognition",
        "--recipe",
        recipe,
        "--output_dir",
        model_dir,
    ]
    if not run_only:
        cli_export(command, model_dir)

    tokenizer = AutoTokenizer.from_pretrained(model_id)
    model = ExecuTorchModelForSpeechSeq2Seq.from_pretrained(model_dir)
    processor = AutoProcessor.from_pretrained(model_id)
    dataset = load_dataset(
        "distil-whisper/librispeech_long", "clean", split="validation"
    )
    sample = dataset[0]["audio"]

    input_features = processor(
        sample["array"],
        return_tensors="pt",
        truncation=False,
        sampling_rate=sample["sampling_rate"],
    ).input_features

    # Current implementation of the transcibe method accepts up to 30 seconds of audio, therefore I trim the audio here.
    input_features_trimmed = input_features[:, :, :3000].contiguous()

    generated_transcription = model.transcribe(tokenizer, input_features_trimmed)
    expected_text = " Mr. Quilter is the apostle of the middle classes, and we are glad to welcome his gospel. Nor is Mr. Quilter's manner less interesting than his matter. He tells us that at this festive season of the year, with Christmas and roast beef looming before us, similarly drawn from eating and its results occur most readily to the mind. He has grave doubts whether Sir Frederick Latins work is really Greek after all, and can discover that."
    print(f"Generated transcription: {generated_transcription}")
    print(f"Expected transcription: {expected_text}")


def test_vit(model_id, model_dir, recipe, *, quantize=False, run_only=False):
    assert not quantize, "Quantization is not supported for ViT models yet."

    assert model_id == "google/vit-base-patch16-224"
    command = [
        "optimum-cli",
        "export",
        "executorch",
        "--model",
        model_id,
        "--task",
        "image-classification",
        "--recipe",
        recipe,
        "--output_dir",
        model_dir,
    ]
    if not run_only:
        cli_export(command, model_dir)

    config = AutoConfig.from_pretrained(model_id)
    batch_size = 1
    num_channels = config.num_channels
    height = config.image_size
    width = config.image_size
    pixel_values = torch.rand(batch_size, num_channels, height, width)

    # Test fetching and lowering the model to ExecuTorch
    et_model = ExecuTorchModelForImageClassification.from_pretrained(model_id=model_dir)
    eager_model = (
        AutoModelForImageClassification.from_pretrained(model_id).eval().to("cpu")
    )
    with torch.no_grad():
        eager_output = eager_model(pixel_values)
        et_output = et_model.forward(pixel_values)

    assert torch.allclose(
        eager_output.logits, et_output, atol=1e-02, rtol=1e-02
    ), "Model output does not match eager"


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--model", type=str, required=True)
    parser.add_argument("--recipe", type=str, required=True)
    parser.add_argument("--quantize", action="store_true", help="Enable quantization")
    parser.add_argument(
        "--model_dir",
        type=str,
        required=False,
        help="When provided, write the pte file to this directory. Otherwise, a temporary directory is created for the test.",
    )
    parser.add_argument(
        "--run_only", action="store_true", help="Skip export and only run the test"
    )
    args = parser.parse_args()

    _text_generation_mapping = {
        "llama3.2-1b": ("NousResearch/Llama-3.2-1B", test_text_generation),
        "qwen3-0.6b": ("Qwen/Qwen3-0.6B", test_text_generation),
        "qwen3-1.7b": ("Qwen/Qwen3-1.7B", test_text_generation),
        "gemma3-1b": (
            "unsloth/gemma-3-1b-it",
            test_text_generation,
        ),  # does not export for CoreML
        "phi4-mini": (
            "microsoft/Phi-4-mini-instruct",
            test_text_generation,
        ),  # fails to lower for CoreML
        "smollm2-135m": ("HuggingFaceTB/SmolLM2-135M", test_text_generation),
        "smollm3-3b": ("HuggingFaceTB/SmolLM3-3B", test_text_generation),
        "olmo-1b": ("allenai/OLMo-1B-hf", test_text_generation),
    }

    _mask_fill_mapping = {
        "bert": ("google-bert/bert-base-uncased", test_fill_mask),
        "roberta": ("FacebookAI/xlmcl-roberta-base", test_fill_mask),
        "distilbert": ("distilbert/distilbert-base-uncased", test_fill_mask),
    }

    _misc_model_mapping = {
        "whisper": ("openai/whisper-tiny", test_whisper),
        "t5": ("google-t5/t5-small", test_t5),  # CoreML runime failure
        "vit": ("google/vit-base-patch16-224", test_vit),
    }

    _multimodal_model_mapping = {
        "gemma3-4b": ("google/gemma-3-4b-it", test_llm_with_image_modality),
        "llava": ("llava-hf/llava-1.5-7b-hf", test_llm_with_image_modality),
    }

    model_to_model_id_and_test_function = (
        _text_generation_mapping
        | _mask_fill_mapping
        | _misc_model_mapping
        | _multimodal_model_mapping
    )

    if args.model not in model_to_model_id_and_test_function:
        raise ValueError(
            f"Unknown model name: {args.model}. Available models: {model_to_model_id_and_test_function.keys()}"
        )

    model_id, test_fn = model_to_model_id_and_test_function[args.model]
    with tempfile.TemporaryDirectory() as tmp_dir:
        test_fn(
            model_id=model_id,
            model_dir=tmp_dir if args.model_dir is None else args.model_dir,
            recipe=args.recipe,
            quantize=args.quantize,
            run_only=args.run_only,
        )
